package cloud.xiguapi.ubas.analysis.uv.model;

import cloud.xiguapi.ubas.model.UserBehavior;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.util.HashSet;

/**
 * @author 大大大西西瓜皮🍉
 * date: 2021-5-19 上午 09:38
 * desc:
 */
public class SetUvCountResult implements AllWindowFunction<UserBehavior, UVResult, TimeWindow> {

    @Override
    public void apply(TimeWindow window, Iterable<UserBehavior> values, Collector<UVResult> out) throws Exception {
        // 定义一个Set, 保存窗口中的所有的userId, 自动去重
        HashSet<Long> uidSet = new HashSet<>();
        values.forEach(value -> uidSet.add(value.getUserId()));
        out.collect(UVResult.builder()
                .windowEnd(new Timestamp(window.getEnd()).toString())
                .count((long) uidSet.size())
                .build());
    }
}